李守中
该站已迁往根域名 https://lishouzhong.com
需要注意,迁移后的文章的 url 可能会发生变化。
域名 https://note.lishouzhong.com 下的内容将不再更新,但已有内容会永久保留。

Java NIO

Table of Contents

1. 基础

根据 UNIX 网络编程对 I/O 模型的分类可以归纳出 5 种模型:

  • 阻塞 I/O:
    • 用户进程在拿到数据前,一直等待 (占用 CPU 以及硬盘 I/O 通道等)。
  • 非阻塞 I/O (数据没准备好可以做其他事情,提升了单个进程的质量):
    • 如果 kernel 中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个 error。
    • 从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。
    • 用户进程收到结果是一个 error 时,知道数据还没有准备好,于是就可以在下次再进行 read 操作之前做其他事情,或者直接再次发送 read 操作。
    • 一旦 kernel 中的数据准备好,并再次收到用户进程的 system call,那么它马上将数据拷贝到用户进程的内存 (这一阶段仍然阻塞),然后返回。
  • I/O 多路复用 (主要用在网络 I/O 上,牺牲单个 I/O 质量,提升可处理的 I/O 数量):
    • 所有用户进程统一把 I/O 请求提交到同一个地方。
    • 用户进程提交 I/O 请求后被阻塞。
    • 数据准备好后通知 (激活) 用户进程自己取。
  • 异步 I/O:
    • 所有用户进程统一把 I/O 请求提交到一个地方 (这一步与 I/O 多路复用相同)。
    • 当用户进程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内。
    • 内核在 I/O 完成后通知用户线程直接使用即可。
  • 信号驱动 I/O (不实用,跳过)

JAVA 中的 I/O 可分为两类:

  • 普通 JAVA I/O:
    • 面向流 Stream Oriented
    • 阻塞式 I/O blocking I/O (仅限网络 I/O)
  • NIO:
    • 面向缓冲区 Buffer Oriented
    • 非阻塞式 I/O Non Blocking I/O (仅限网络 I/O)
    • 选择器 Selectors (仅限网络 I/O)

简单理解: 普通 IO 使用面向流的处理方式,NIO 使用面向缓冲块的处理方式。

面向流的 I/O 一次一个字节地处理数据。面向缓冲块的 I/O 以缓冲块为单位处理数据。

NIO 主要由三个核心部分组成:

  1. Buffer 缓冲区
  2. Channel 数据管道
  3. Selector 选择器

java 的普通 I/O 已经被 NIO 重写过了,所以可以不必显式地使用 NIO。

2. 文件 I/O

2.1. 缓冲区与数据管道

NIO 使用 Buffer 缓冲区和 Channel 数据管道配合来处理数据,不以流的方式处理数据。

Buffer 中的数据通过 Channel 传输到输出端。Channel 不处理数据,它只负责运输数据。

传统 I/O 的流是单向的,NIO 有 Channel 这个概念,可双向读写。

2.2. Buffer 缓冲区

Buffer 类是缓冲区的抽象类。~ByteBuffer~ 是最常用的实现类,用于读写字节数据。

Buffer 类维护了 4 个核心变量来提供关于数据区的信息:

  1. 容量 Capacity
    • 数据区能够容纳的数据元素的最大数量。容量在缓冲区创建时被设定,并且永远不能被改变 (底层是数组)。
  2. 上界 Limit
    • 数据区里的数据的总数,代表了当前缓冲区中一共有多少数据。
  3. 位置 Position
    • 下一个要被读或写的元素的位置。 Position 会自动由相应的 get()put() 函数更新。
  4. 标记 Mark
    • 用于记录上一次读写的位置。

代码演示:

// 创建一个缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 看一下初始时4个核心变量的值
System.out.println("初始时-->limit--->"+byteBuffer.limit());
System.out.println("初始时-->position--->"+byteBuffer.position());
System.out.println("初始时-->capacity--->"+byteBuffer.capacity());
System.out.println("初始时-->mark--->" + byteBuffer.mark());

System.out.println("--------------------------------------");

// 添加一些数据到缓冲区中
String s = "Gridsah";
byteBuffer.put(s.getBytes());

// 看一下初始时4个核心变量的值
System.out.println("put完之后-->limit--->"+byteBuffer.limit());
System.out.println("put完之后-->position--->"+byteBuffer.position());
System.out.println("put完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("put完之后-->mark--->" + byteBuffer.mark());

System.out.println("--------------------------------------");

// flip() 方法
byteBuffer.flip();
System.out.println("flip完之后-->limit--->"+byteBuffer.limit());
System.out.println("flip完之后-->position--->"+byteBuffer.position());
System.out.println("flip完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("flip完之后-->mark--->" + byteBuffer.mark());

System.out.println("--------------------------------------");

// get() 方法
// 创建一个 limit 大小的字节数组
byte[] bytes = new byte[byteBuffer.limit()];
// 将读取的数据装进字节数组中
byteBuffer.get(bytes);
// 输出数据
System.out.println(new String(bytes, 0, bytes.length));
System.out.println("get完之后-->limit--->"+byteBuffer.limit());
System.out.println("get完之后-->position--->"+byteBuffer.position());
System.out.println("get完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("get完之后-->mark--->" + byteBuffer.mark());

输出如下:

初始时-->limit--->1024
初始时-->position--->0
初始时-->capacity--->1024
初始时-->mark--->java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]
--------------------------------------
put完之后-->limit--->1024
put完之后-->position--->7
put完之后-->capacity--->1024
put完之后-->mark--->java.nio.HeapByteBuffer[pos=7 lim=1024 cap=1024]
--------------------------------------
flip完之后-->limit--->7
flip完之后-->position--->0
flip完之后-->capacity--->1024
flip完之后-->mark--->java.nio.HeapByteBuffer[pos=0 lim=7 cap=1024]
--------------------------------------
Gridsah
get完之后-->limit--->7
get完之后-->position--->7
get完之后-->capacity--->1024
get完之后-->mark--->java.nio.HeapByteBuffer[pos=7 lim=7 cap=1024]

NIO 给了一个 flip() 方法从缓存区拿数据: 这个方法改动了 positionlimit 的位置。

调用完 filp() 后: position 是开始读的位置,而 limit 限制读到哪里。

一般称 filp() 的作用为 切换成读模式 。每当从缓存区读取数据时它就被调用。

切换成读模式之后,可以用 get() 读取数据,读取完之后有 position = limit

2.3. FileChannel 通道

Channel 只负责传输数据,所有数据操应作用于 BufferChannel 接口的主要实现类有:

  1. FileChannel
  2. SocketChannel
  3. ServerSocketChannel
  4. DatagramChannel

获取通道代码演示:

// 1. 通过本地 IO 的方式来获取通道
FileInputStream fileInputStream = new FileInputStream("F:\\JavaEE常用框架\\wtf.md");
// 得到文件的输入通道
FileChannel inChannel = fileInputStream.getChannel();

// 2. jdk1.7 后通过静态方法 open() 获取通道
FileChannel.open(Paths.get("F:\\JavaEE常用框架\\wtf.md"), StandardOpenOption.WRITE);

使用 FileChannel 配合 Buffer 实现文件复制代码演示:

try {
    // 创建通道
    FileChannel inChannel = new FileInputStream("1.jpg").getChannel();
    FileChannel outChannel = new FileInputStream("2.jpg").getChannel();
    // 创建缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);
    // 将通道中的数据存入缓冲区
    while(inChannel.read(buf) != -1){
        buf.flip(); // 切换读模式
        outChannel.write(buf); // 将缓冲区数据传入通道
        buf.clear(); // 清空缓冲区
    }
} catch (IOException e) {
    e.printStackTrace();
}

使用 内存映射文件 实现文件复制 (直接操作缓冲区) 代码演示:

FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);

// 映射内存
MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_Write, 0, inChannel.size());

// 直接对缓冲区进行读写
byte[] dst = new Byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);

通道间通过 transfer() 实现数据传输 (直接操作缓冲区) 代码演示:

FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);

inChannel.transferTo(0, inChannel.size(), outChannel);

2.4. 直接与非直接缓存区

将数据读入缓缓存时,缓存区可以有两种情况:

  1. 非直接缓存区:
    1. 程序所有读写操作必须经过一个从内核空间 (OS) 到用户空间 (JVM) 的 copy 阶段。
    2. 内存中有两个相同的缓存区块,一个属于内核空间,一个属于用户空间。
    3. 优点: 安全。缺点: 速度极慢。
  2. 直接缓冲区:
    1. 不需要经过 copy 阶段。
    2. 内存中只有一个缓存区块,内核 (OS) 与用户空间 (JVM) 共享这个缓存区块。
    3. 优点: 速度快。缺点: 创建和回收代价高。不安全。大文件操作时 CPU 占用极高。

创建非直接缓存区时,用 allocate() 工厂方法分配空间,将缓冲区建立在 JVM 的内存中。

直接缓存区创建有两种方式:

  1. 创建缓存时用 allocateDirect() 工厂方法分配空间。
  2. FileChannel 上调用 map() 方法,将文件直接映射到内存中创建。

缓存区可以调用 isDirect() 方法来区分直接与非直接。

2.5. scatter 和 gather 字符集

分散读取 scatter: 将读取到的数据 依次 填满多个缓存区。

聚集写入 gather: 将多个缓存区中的数据 按顺序 集中写入到一个通道中。

分散读取与聚集写入代码演示:

FileChannel channelA = new FileInputStream("F:\\wtf.md").getChannel();
FileChannel channelB = new RandomAccessFile("2.txt", "rw").getChannel();

ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
ByteBuffer buf3 = ByteBuffer.allocate(100);
ByteBuffer buf4 = ByteBuffer.allocate(1024);

ByteBuffer[] bufsA = {buf1, buf2};
ByteBuffer[] bufsB = {buf3, buf4};
// 分散读取
channelA.read(bufsA);
// 聚集写入
channelB.write(bufsB);

for (ByteBuffer[] byteBuffer : bufs) {
    byteBuffer.flip();
}

System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("-----------------")
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()))

2.6. 字符集

重点在于编码格式和解码格式一致:

Charset csA = Charset.forName("GBK");
// 获取编码器
CharsetEncoder ce = csA.newEncoder();
// 获取解码器
CharsetDecoder cd = csA.newDecoder();
// 创建字符缓存区
CharBuffer cBuf1 = CharBuffer.allocate(1024);
//放入字符
cBuf1.put("what?");
cBuf1.flip();

// 对字符编码
ByteBuffer bBuf = ce.encode(cBuf1);
for (int i = 0; bBuf < 5; i++) {
    System.out.print("["+bBuf.get() + "]");
}

System.out.print("\n");

// 对字符解码
bBuf.flip();
CharBuffer cBuf2 = cd.decode(bBuf);
for (int i = 0; i < cBuf2.limit(); i++) {
    System.out.print("["+cBuf2.get() + "]");
}

输出如下:

[119][104][97][116][63]
[w][h][a][t][?]

3. 网络 I/O

NIO (non blocking I/O) 是在网络层理解的,非阻塞的特点也是网络 I/O 中体现。

NIO 在网络 I/O 中采用多路复用的 I/O 模型,对于操作文件的 FileChannel 来说依旧采用阻塞式的 I/O 模型。

SelectableChannel 是网络通信常用的 Channel 接口的实现,它的子类有:

  • SocketChannel
  • ServerSocketChannel
  • DatagramChannel
  • Pipe.SinkChannel
  • Pipe.SourceChannel

3.1. 阻塞式网络 I/O

阻塞式 I/O 模型不需要 Selector 选择器参与,代码演示:

// 客户端
// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 6666));
// 2. 创建从本地读取文件的通道
FileChannel fileChannel = FileChannel.open(Paths.get("C:\\a.txt"), StandardOpenOption.READ);
// 3. 创建缓存区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 4. 读取本地文件并发送出去
while (fileChannel.read(buffer) != -1) {
    buffer.flip();
    socketChannel.write(buffer);
    buffer.clear();
}
// 告诉服务端已经发完数据了
socketChannel.shutdownOoutput();
// 等待服务器的传输完成响应
int len = 0;
while((len = socketChannel.read(buffer)) != -1) {
    buffer.flip();
    System.out.println(new String(buffer.array(), 0, len));
    buffer.clear();
}
// 5. 关闭通道
fileChannel.close();
socketChannel.close();
// 服务端

// 1. 获取通道
ServerSocketChannel server = ServerSocketChannel.open();
// 2. 创建写入数据的通道
FileChannel outChannel = FileChannel.open(Paths.get("b.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE)
// 3. 绑定 socket 连接
server.bind(new InetSocketAddress("localhost",6666));
// 4. 获取客户端连接
SocketChannel socketChannel = server.accept();
// 5. 创建缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 6. 将客户端传递过来的文件保存到本地
while (client.read(buffer) != -1) {
    buffer.flip();
    outChannel.write(buffer);
    buffer.clear();
}
// 服务端收完数据后,通知客户端
buffer.put("success".getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
// 7.关闭通道
outChannel.close();
socketChannel.close();
server.close();

3.2. NIO 非阻塞

使用非阻塞模式可以使客户端不用显式通知服务器数据发送完毕。代码演示:

// 客户端:
// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
// 切换成非阻塞模式
socketChannel.configureBlocking(false);
// 2. 创建从本地读取文件的通道
FileChannel fileChannel = FileChannel.open(Paths.get("C:\\1.png"), StandardOpenOption.READ);
// 3. 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 4.读取本地文件发送到服务器
while (fileChannel.read(buffer) != -1) {
    buffer.flip();
    socketChannel.write(buffer);
    buffer.clear();
}
// 5. 关闭流
fileChannel.close();
socketChannel.close();
// 服务端
// 1.获取通道
ServerSocketChannel server = ServerSocketChannel.open();
// 2.切换成非阻塞模式
server.configureBlocking(false);
// 3. 绑定连接
server.bind(new InetSocketAddress("localhost", 6666));
// 4. 获取选择器
Selector selector = Selector.open();
// 将通道注册到选择器上,指定接收 监听通道 事件
server.register(selector, SelectionKey.OP_ACCEPT);
// 5. 轮询地获取选择器上 已就绪 的事件
// 有事件已就绪就开始处理 select()>0 为已就绪
while (selector.select() > 0) {
    // 6. 获取当前选择器所有注册的“选择键”(已就绪的监听事件)
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    // 7. 处理所有 已就绪 的事件
    while (iterator.hasNext()) {
        // 按顺序接收 就绪事件
        SelectionKey selectionKey = iterator.next();
        // 不同的事件做不同的事
        if (selectionKey.isAcceptable()) { // 建立连接 事件就绪
            // 获取客户端的链接
            SocketChannel socketChannel = server.accept();
            // 把连接切换成非阻塞状态
            socketChannel.configureBlocking(false);
            // 把连接注册到选择器上,监听 读就绪 事件
            socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (selectionKey.isReadable()) { // 读 事件就绪
            // 获取当前选择器读就绪状态的通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            // 创建读数据的缓存区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            // 创建写入数据的通道(写模式、文件不存在则创建)
            FileChannel outChannel = FileChannel.open(Paths.get("2.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
            while (socketChannel.read(buffer) > 0) {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
            }
            // 文件保存完后通知客户端
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            writeBuffer.put("success".getBytes());
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
        }
        // 移除已经处理过的选择键(事件)
        iterator.remove();
    }
}

但是在非阻塞模式下,客户端要获取服务端的数据,也要在 Selector 上注册,监听读事件。代码演示:

// 客户端:
// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
// 切换成非阻塞模式
socketChannel.configureBlocking(false);
// 2. 创建从本地读取文件的通道
FileChannel fileChannel = FileChannel.open(Paths.get("C:\\1.png"), StandardOpenOption.READ);
// 3. 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

// 4. 获取选择器
Selector selector = Selector.open();
// 5. 将通道注册到选择器中,获取服务端返回的数据
socketChannel.register(selector, SelectionKey.OP_READ);

// 6. 读取本地文件发送到服务器
while (fileChannel.read(buffer) != -1) {
    buffer.flip();
    socketChannel.write(buffer);
    buffer.clear();
}

// 7. 轮询地获取选择器上 已就绪 的事件
// 有事件已就绪就开始处理 select()>0 为已就绪
while (selector.select() > 0) {
    // 获取当前选择器所有注册的“选择键”(已就绪的监听事件)
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    // 处理所有 已就绪 的事件
    while (iterator.hasNext()) {
        // 按顺序接收 就绪事件
        SelectionKey selectionKey = iterator.next();
        // 不同的事件做不同的事
        if (selectionKey.isReadable()) { // 读事件就绪
            // 获取当前选择器读就绪状态的通道
            SocketChannel tempSocketChannel = (SocketChannel) selectionKey.channel();
            // 创建读数据的缓存区
            ByteBuffer tempByteBuffer = ByteBuffer.allocate(1024);
            // 知道服务端要返回响应的数据给客户端,客户端在这里接收
            int readBytes = tempSocketChannel.read(tempByteBuffer);
            if (readBytes > 0) {
                tempByteBuffer.flip();
                System.out.println(new String(tempByteBuffer.array(), 0, readBytes));
            }
        }
        // 移除已经处理过的选择键(事件)
        iterator.remove();
    }
}

3.3. UDP 传输 DatagramChannel

代码演示:

// 客户端
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scan = new Scanner(System.in);

while(input.hasNext()){
    String str = input.nextLine();
    buf.put((new Date().toString() + ":" + str).getBytes());
    buf.flip();
    dc.send(buf, new InetSocketAddress("127.0.0.1", 8989));
    buf.clear();
}

datagramChannel.close();
// 服务端
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
datagramChannel.bind(new InetSocketAddress(8989));
Selector selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()) {
        SelectionKey selectionKey = iterator.next();
        if (selectionKey.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            datagramChannel.receive(buffer);
            buffer.flip();
            System.out.println(new String(buffer.array(), 0, buffer.limit()));
            buffer.clear();
        }
    }
    iterator.remove();
}

3.4. 管道

管道是两个线程之间的 单向连接

Pipe 有一个 source 通道和一个 sink 通道。数据会被写入 sink 通道,从 source 通道获取。

代码演示:

// 获取管道
Pipe pipe = Pipe.open();

// 将数据写入缓存区并发送
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("one way data transmission.".getBytes());
buf.flip();
sinkChannel.write(buf);
buf.clear();

// 读取缓存区的数据
Pipe.SourceChannel sourceChannel = pipe.source();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));

sourceChannel.close();
sinkChannel.close();


Last Update: 2023-08-13 Sun 14:23

Generated by: Emacs 28.2 (Org mode 9.5.5)   Contact: lsz.sino@outlook.com

若正文中无特殊说明,本站内容遵循: 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议